from sqlite_utils import cli, Database
from sqlite_utils.db import Index, ForeignKey
from click.testing import CliRunner
from pathlib import Path
import subprocess
import sys
import json
import os
import pytest
import textwrap


def write_json(file_path, data):
    with open(file_path, "w") as fp:
        json.dump(data, fp)


def _supports_pragma_function_list():
    db = Database(memory=True)
    try:
        db.execute("select * from pragma_function_list()")
    except Exception:
        return False
    return True


def _has_compiled_ext():
    for ext in ["dylib", "so", "dll"]:
        path = Path(__file__).parent / f"ext.{ext}"
        if path.is_file():
            return True
    return False


COMPILED_EXTENSION_PATH = str(Path(__file__).parent / "ext")


@pytest.mark.parametrize(
    "options",
    (
        ["-h"],
        ["--help"],
        ["insert", "-h"],
        ["insert", "--help"],
    ),
)
def test_help(options):
    result = CliRunner().invoke(cli.cli, options)
    assert result.exit_code == 0
    assert result.output.startswith("Usage: ")
    assert "-h, --help" in result.output


def test_tables(db_path):
    result = CliRunner().invoke(cli.cli, ["tables", db_path], catch_exceptions=False)
    assert '[{"table": "Gosh"},\n {"table": "Gosh2"}]' == result.output.strip()


def test_views(db_path):
    Database(db_path).create_view("hello", "select sqlite_version()")
    result = CliRunner().invoke(cli.cli, ["views", db_path, "--table", "--schema"])
    assert (
        "view    schema\n"
        "------  ----------------------------------------------\n"
        'hello   CREATE VIEW "hello" AS select sqlite_version()'
    ) == result.output.strip()


def test_tables_fts4(db_path):
    Database(db_path)["Gosh"].enable_fts(["c2"], fts_version="FTS4")
    result = CliRunner().invoke(cli.cli, ["tables", "--fts4", db_path])
    assert '[{"table": "Gosh_fts"}]' == result.output.strip()


def test_tables_fts5(db_path):
    Database(db_path)["Gosh"].enable_fts(["c2"], fts_version="FTS5")
    result = CliRunner().invoke(cli.cli, ["tables", "--fts5", db_path])
    assert '[{"table": "Gosh_fts"}]' == result.output.strip()


def test_tables_counts_and_columns(db_path):
    db = Database(db_path)
    with db.conn:
        db["lots"].insert_all([{"id": i, "age": i + 1} for i in range(30)])
    result = CliRunner().invoke(cli.cli, ["tables", "--counts", "--columns", db_path])
    assert (
        '[{"table": "Gosh", "count": 0, "columns": ["c1", "c2", "c3"]},\n'
        ' {"table": "Gosh2", "count": 0, "columns": ["c1", "c2", "c3"]},\n'
        ' {"table": "lots", "count": 30, "columns": ["id", "age"]}]'
    ) == result.output.strip()


@pytest.mark.parametrize(
    "format,expected",
    [
        (
            "--csv",
            (
                "table,count,columns\n"
                'Gosh,0,"c1\n'
                "c2\n"
                'c3"\n'
                'Gosh2,0,"c1\n'
                "c2\n"
                'c3"\n'
                'lots,30,"id\n'
                'age"'
            ),
        ),
        (
            "--tsv",
            "table\tcount\tcolumns\nGosh\t0\t['c1', 'c2', 'c3']\nGosh2\t0\t['c1', 'c2', 'c3']\nlots\t30\t['id', 'age']",
        ),
    ],
)
def test_tables_counts_and_columns_csv(db_path, format, expected):
    db = Database(db_path)
    with db.conn:
        db["lots"].insert_all([{"id": i, "age": i + 1} for i in range(30)])
    result = CliRunner().invoke(
        cli.cli, ["tables", "--counts", "--columns", format, db_path]
    )
    assert result.output.strip().replace("\r", "") == expected


def test_tables_schema(db_path):
    db = Database(db_path)
    with db.conn:
        db["lots"].insert_all([{"id": i, "age": i + 1} for i in range(30)])
    result = CliRunner().invoke(cli.cli, ["tables", "--schema", db_path])
    assert (
        '[{"table": "Gosh", "schema": "CREATE TABLE Gosh (c1 text, c2 text, c3 text)"},\n'
        ' {"table": "Gosh2", "schema": "CREATE TABLE Gosh2 (c1 text, c2 text, c3 text)"},\n'
        ' {"table": "lots", "schema": "CREATE TABLE \\"lots\\" (\\n   \\"id\\" INTEGER,\\n   \\"age\\" INTEGER\\n)"}]'
    ) == result.output.strip()


@pytest.mark.parametrize(
    "options,expected",
    [
        (
            ["--fmt", "simple"],
            (
                "c1     c2     c3\n"
                "-----  -----  ----------\n"
                "verb0  noun0  adjective0\n"
                "verb1  noun1  adjective1\n"
                "verb2  noun2  adjective2\n"
                "verb3  noun3  adjective3"
            ),
        ),
        (
            ["-t"],
            (
                "c1     c2     c3\n"
                "-----  -----  ----------\n"
                "verb0  noun0  adjective0\n"
                "verb1  noun1  adjective1\n"
                "verb2  noun2  adjective2\n"
                "verb3  noun3  adjective3"
            ),
        ),
        (
            ["--fmt", "rst"],
            (
                "=====  =====  ==========\n"
                "c1     c2     c3\n"
                "=====  =====  ==========\n"
                "verb0  noun0  adjective0\n"
                "verb1  noun1  adjective1\n"
                "verb2  noun2  adjective2\n"
                "verb3  noun3  adjective3\n"
                "=====  =====  =========="
            ),
        ),
    ],
)
def test_output_table(db_path, options, expected):
    db = Database(db_path)
    with db.conn:
        db["rows"].insert_all(
            [
                {
                    "c1": "verb{}".format(i),
                    "c2": "noun{}".format(i),
                    "c3": "adjective{}".format(i),
                }
                for i in range(4)
            ]
        )
    result = CliRunner().invoke(cli.cli, ["rows", db_path, "rows"] + options)
    assert result.exit_code == 0
    assert expected == result.output.strip()


def test_create_index(db_path):
    db = Database(db_path)
    assert [] == db["Gosh"].indexes
    result = CliRunner().invoke(cli.cli, ["create-index", db_path, "Gosh", "c1"])
    assert result.exit_code == 0
    assert [
        Index(
            seq=0, name="idx_Gosh_c1", unique=0, origin="c", partial=0, columns=["c1"]
        )
    ] == db["Gosh"].indexes
    # Try with a custom name
    result = CliRunner().invoke(
        cli.cli, ["create-index", db_path, "Gosh", "c2", "--name", "blah"]
    )
    assert result.exit_code == 0
    assert [
        Index(seq=0, name="blah", unique=0, origin="c", partial=0, columns=["c2"]),
        Index(
            seq=1, name="idx_Gosh_c1", unique=0, origin="c", partial=0, columns=["c1"]
        ),
    ] == db["Gosh"].indexes
    # Try a two-column unique index
    create_index_unique_args = [
        "create-index",
        db_path,
        "Gosh2",
        "c1",
        "c2",
        "--unique",
    ]
    result = CliRunner().invoke(cli.cli, create_index_unique_args)
    assert result.exit_code == 0
    assert [
        Index(
            seq=0,
            name="idx_Gosh2_c1_c2",
            unique=1,
            origin="c",
            partial=0,
            columns=["c1", "c2"],
        )
    ] == db["Gosh2"].indexes
    # Trying to create the same index should fail
    assert CliRunner().invoke(cli.cli, create_index_unique_args).exit_code != 0
    # ... unless we use --if-not-exists or --ignore
    for option in ("--if-not-exists", "--ignore"):
        assert (
            CliRunner().invoke(cli.cli, create_index_unique_args + [option]).exit_code
            == 0
        )


def test_create_index_analyze(db_path):
    db = Database(db_path)
    assert "sqlite_stat1" not in db.table_names()
    assert [] == db["Gosh"].indexes
    result = CliRunner().invoke(
        cli.cli, ["create-index", db_path, "Gosh", "c1", "--analyze"]
    )
    assert result.exit_code == 0
    assert "sqlite_stat1" in db.table_names()


def test_create_index_desc(db_path):
    db = Database(db_path)
    assert [] == db["Gosh"].indexes
    result = CliRunner().invoke(cli.cli, ["create-index", db_path, "Gosh", "--", "-c1"])
    assert result.exit_code == 0
    assert (
        db.execute("select sql from sqlite_master where type='index'").fetchone()[0]
        == 'CREATE INDEX "idx_Gosh_c1"\n    ON "Gosh" ("c1" desc)'
    )


@pytest.mark.parametrize(
    "col_name,col_type,expected_schema",
    (
        ("text", "TEXT", 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "text" TEXT)'),
        ("text", "str", 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "text" TEXT)'),
        ("text", "STR", 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "text" TEXT)'),
        (
            "integer",
            "INTEGER",
            'CREATE TABLE "dogs" (\n   "name" TEXT\n, "integer" INTEGER)',
        ),
        (
            "integer",
            "int",
            'CREATE TABLE "dogs" (\n   "name" TEXT\n, "integer" INTEGER)',
        ),
        ("float", "FLOAT", 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "float" REAL)'),
        ("blob", "blob", 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "blob" BLOB)'),
        ("blob", "BLOB", 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "blob" BLOB)'),
        ("blob", "bytes", 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "blob" BLOB)'),
        ("blob", "BYTES", 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "blob" BLOB)'),
        ("default", None, 'CREATE TABLE "dogs" (\n   "name" TEXT\n, "default" TEXT)'),
    ),
)
def test_add_column(db_path, col_name, col_type, expected_schema):
    db = Database(db_path)
    db.create_table("dogs", {"name": str})
    assert db["dogs"].schema == 'CREATE TABLE "dogs" (\n   "name" TEXT\n)'
    args = ["add-column", db_path, "dogs", col_name]
    if col_type is not None:
        args.append(col_type)
    assert CliRunner().invoke(cli.cli, args).exit_code == 0
    assert db["dogs"].schema == expected_schema


@pytest.mark.parametrize("ignore", (True, False))
def test_add_column_ignore(db_path, ignore):
    db = Database(db_path)
    db.create_table("dogs", {"name": str})
    args = ["add-column", db_path, "dogs", "name"] + (["--ignore"] if ignore else [])
    result = CliRunner().invoke(cli.cli, args)
    if ignore:
        assert result.exit_code == 0
    else:
        assert result.exit_code == 1
        assert result.output == "Error: duplicate column name: name\n"


def test_add_column_not_null_default(db_path):
    db = Database(db_path)
    db.create_table("dogs", {"name": str})
    assert db["dogs"].schema == 'CREATE TABLE "dogs" (\n   "name" TEXT\n)'
    args = [
        "add-column",
        db_path,
        "dogs",
        "nickname",
        "--not-null-default",
        "dogs'dawg",
    ]
    assert CliRunner().invoke(cli.cli, args).exit_code == 0
    assert db["dogs"].schema == (
        'CREATE TABLE "dogs" (\n'
        '   "name" TEXT\n'
        ", \"nickname\" TEXT NOT NULL DEFAULT 'dogs''dawg')"
    )


@pytest.mark.parametrize(
    "args,assert_message",
    (
        (
            ["books", "author_id", "authors", "id"],
            "Explicit other_table and other_column",
        ),
        (["books", "author_id", "authors"], "Explicit other_table, guess other_column"),
        (["books", "author_id"], "Automatically guess other_table and other_column"),
    ),
)
def test_add_foreign_key(db_path, args, assert_message):
    db = Database(db_path)
    db["authors"].insert_all(
        [{"id": 1, "name": "Sally"}, {"id": 2, "name": "Asheesh"}], pk="id"
    )
    db["books"].insert_all(
        [
            {"title": "Hedgehogs of the world", "author_id": 1},
            {"title": "How to train your wolf", "author_id": 2},
        ]
    )
    assert (
        CliRunner().invoke(cli.cli, ["add-foreign-key", db_path] + args).exit_code == 0
    ), assert_message
    assert [
        ForeignKey(
            table="books", column="author_id", other_table="authors", other_column="id"
        )
    ] == db["books"].foreign_keys

    # Error if we try to add it twice:
    result = CliRunner().invoke(
        cli.cli, ["add-foreign-key", db_path, "books", "author_id", "authors", "id"]
    )
    assert result.exit_code != 0
    assert (
        "Error: Foreign key already exists for author_id => authors.id"
        == result.output.strip()
    )

    # No error if we add it twice with --ignore
    result = CliRunner().invoke(
        cli.cli,
        ["add-foreign-key", db_path, "books", "author_id", "authors", "id", "--ignore"],
    )
    assert result.exit_code == 0

    # Error if we try against an invalid column
    result = CliRunner().invoke(
        cli.cli, ["add-foreign-key", db_path, "books", "author_id", "authors", "bad"]
    )
    assert result.exit_code != 0
    assert "Error: No such column: authors.bad" == result.output.strip()


def test_add_column_foreign_key(db_path):
    db = Database(db_path)
    db["authors"].insert({"id": 1, "name": "Sally"}, pk="id")
    db["books"].insert({"title": "Hedgehogs of the world"})
    # Add an author_id foreign key column to the books table
    result = CliRunner().invoke(
        cli.cli, ["add-column", db_path, "books", "author_id", "--fk", "authors"]
    )
    assert result.exit_code == 0, result.output
    assert db["books"].schema == (
        'CREATE TABLE "books" (\n'
        '   "title" TEXT,\n'
        '   "author_id" INTEGER REFERENCES "authors"("id")\n'
        ")"
    )
    # Try it again with a custom --fk-col
    result = CliRunner().invoke(
        cli.cli,
        [
            "add-column",
            db_path,
            "books",
            "author_name_ref",
            "--fk",
            "authors",
            "--fk-col",
            "name",
        ],
    )
    assert result.exit_code == 0, result.output
    assert db["books"].schema == (
        'CREATE TABLE "books" (\n'
        '   "title" TEXT,\n'
        '   "author_id" INTEGER REFERENCES "authors"("id"),\n'
        '   "author_name_ref" TEXT REFERENCES "authors"("name")\n'
        ")"
    )
    # Throw an error if the --fk table does not exist
    result = CliRunner().invoke(
        cli.cli, ["add-column", db_path, "books", "author_id", "--fk", "bobcats"]
    )
    assert result.exit_code != 0
    assert "table 'bobcats' does not exist" in str(result.exception)


def test_suggest_alter_if_column_missing(db_path):
    db = Database(db_path)
    db["authors"].insert({"id": 1, "name": "Sally"}, pk="id")
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "authors", "-"],
        input='{"id": 2, "name": "Barry", "age": 43}',
    )
    assert result.exit_code != 0
    assert result.output.strip() == (
        "Error: table authors has no column named age\n\n"
        "Try using --alter to add additional columns"
    )


def test_index_foreign_keys(db_path):
    test_add_column_foreign_key(db_path)
    db = Database(db_path)
    assert [] == db["books"].indexes
    result = CliRunner().invoke(cli.cli, ["index-foreign-keys", db_path])
    assert result.exit_code == 0
    assert [["author_id"], ["author_name_ref"]] == [
        i.columns for i in db["books"].indexes
    ]


def test_enable_fts(db_path):
    db = Database(db_path)
    assert db["Gosh"].detect_fts() is None
    result = CliRunner().invoke(
        cli.cli, ["enable-fts", db_path, "Gosh", "c1", "--fts4"]
    )
    assert result.exit_code == 0
    assert "Gosh_fts" == db["Gosh"].detect_fts()

    # Table names with restricted chars are handled correctly.
    # colons and dots are restricted characters for table names.
    db["http://example.com"].create({"c1": str, "c2": str, "c3": str})
    assert db["http://example.com"].detect_fts() is None
    result = CliRunner().invoke(
        cli.cli,
        [
            "enable-fts",
            db_path,
            "http://example.com",
            "c1",
            "--fts4",
            "--tokenize",
            "porter",
        ],
    )
    assert result.exit_code == 0
    assert "http://example.com_fts" == db["http://example.com"].detect_fts()
    # Check tokenize was set to porter
    assert (
        'CREATE VIRTUAL TABLE "http://example.com_fts" USING FTS4 (\n'
        '    "c1",\n'
        "    tokenize='porter',\n"
        '    content="http://example.com"'
        "\n)"
    ) == db["http://example.com_fts"].schema
    db["http://example.com"].drop()


def test_enable_fts_replace(db_path):
    db = Database(db_path)
    assert db["Gosh"].detect_fts() is None
    result = CliRunner().invoke(
        cli.cli, ["enable-fts", db_path, "Gosh", "c1", "--fts4"]
    )
    assert result.exit_code == 0
    assert "Gosh_fts" == db["Gosh"].detect_fts()
    assert db["Gosh_fts"].columns_dict == {"c1": str}

    # This should throw an error
    result2 = CliRunner().invoke(
        cli.cli, ["enable-fts", db_path, "Gosh", "c1", "--fts4"]
    )
    assert result2.exit_code == 1
    assert result2.output == 'Error: table "Gosh_fts" already exists\n'

    # This should work
    result3 = CliRunner().invoke(
        cli.cli, ["enable-fts", db_path, "Gosh", "c2", "--fts4", "--replace"]
    )
    assert result3.exit_code == 0
    assert db["Gosh_fts"].columns_dict == {"c2": str}


def test_enable_fts_with_triggers(db_path):
    Database(db_path)["Gosh"].insert_all([{"c1": "baz"}])
    exit_code = (
        CliRunner()
        .invoke(
            cli.cli,
            ["enable-fts", db_path, "Gosh", "c1", "--fts4", "--create-triggers"],
        )
        .exit_code
    )
    assert exit_code == 0

    def search(q):
        return (
            Database(db_path)
            .execute("select c1 from Gosh_fts where c1 match ?", [q])
            .fetchall()
        )

    assert [("baz",)] == search("baz")
    Database(db_path)["Gosh"].insert_all([{"c1": "martha"}])
    assert [("martha",)] == search("martha")


def test_populate_fts(db_path):
    Database(db_path)["Gosh"].insert_all([{"c1": "baz"}])
    exit_code = (
        CliRunner()
        .invoke(cli.cli, ["enable-fts", db_path, "Gosh", "c1", "--fts4"])
        .exit_code
    )
    assert exit_code == 0

    def search(q):
        return (
            Database(db_path)
            .execute("select c1 from Gosh_fts where c1 match ?", [q])
            .fetchall()
        )

    assert [("baz",)] == search("baz")
    Database(db_path)["Gosh"].insert_all([{"c1": "martha"}])
    assert [] == search("martha")
    exit_code = (
        CliRunner().invoke(cli.cli, ["populate-fts", db_path, "Gosh", "c1"]).exit_code
    )
    assert exit_code == 0
    assert [("martha",)] == search("martha")


def test_disable_fts(db_path):
    db = Database(db_path)
    assert {"Gosh", "Gosh2"} == set(db.table_names())
    db["Gosh"].enable_fts(["c1"], create_triggers=True)
    assert {
        "Gosh_fts",
        "Gosh_fts_idx",
        "Gosh_fts_data",
        "Gosh2",
        "Gosh_fts_config",
        "Gosh",
        "Gosh_fts_docsize",
    } == set(db.table_names())
    exit_code = CliRunner().invoke(cli.cli, ["disable-fts", db_path, "Gosh"]).exit_code
    assert exit_code == 0
    assert {"Gosh", "Gosh2"} == set(db.table_names())


def test_vacuum(db_path):
    result = CliRunner().invoke(cli.cli, ["vacuum", db_path])
    assert result.exit_code == 0


def test_dump(db_path):
    result = CliRunner().invoke(cli.cli, ["dump", db_path])
    assert result.exit_code == 0
    assert result.output.startswith("BEGIN TRANSACTION;")
    assert result.output.strip().endswith("COMMIT;")


@pytest.mark.parametrize("tables", ([], ["Gosh"], ["Gosh2"]))
def test_optimize(db_path, tables):
    db = Database(db_path)
    with db.conn:
        for table in ("Gosh", "Gosh2"):
            db[table].insert_all(
                [
                    {
                        "c1": "verb{}".format(i),
                        "c2": "noun{}".format(i),
                        "c3": "adjective{}".format(i),
                    }
                    for i in range(10000)
                ]
            )
        db["Gosh"].enable_fts(["c1", "c2", "c3"], fts_version="FTS4")
        db["Gosh2"].enable_fts(["c1", "c2", "c3"], fts_version="FTS5")
    size_before_optimize = os.stat(db_path).st_size
    result = CliRunner().invoke(cli.cli, ["optimize", db_path] + tables)
    assert result.exit_code == 0
    size_after_optimize = os.stat(db_path).st_size
    # Weirdest thing: tests started failing because size after
    # ended up larger than size before in some cases. I think
    # it's OK to tolerate that happening, though it's very strange.
    assert size_after_optimize <= (size_before_optimize + 10000)
    # Soundness check that --no-vacuum doesn't throw errors:
    result = CliRunner().invoke(cli.cli, ["optimize", "--no-vacuum", db_path])
    assert result.exit_code == 0


def test_rebuild_fts_fixes_docsize_error(db_path):
    db = Database(db_path, recursive_triggers=False)
    records = [
        {
            "c1": "verb{}".format(i),
            "c2": "noun{}".format(i),
            "c3": "adjective{}".format(i),
        }
        for i in range(10000)
    ]
    with db.conn:
        db["fts5_table"].insert_all(records, pk="c1")
        db["fts5_table"].enable_fts(
            ["c1", "c2", "c3"], fts_version="FTS5", create_triggers=True
        )
    # Search should work
    assert list(db["fts5_table"].search("verb1"))
    # Replicate docsize error from this issue for FTS5
    # https://github.com/simonw/sqlite-utils/issues/149
    assert db["fts5_table_fts_docsize"].count == 10000
    db["fts5_table"].insert_all(records, replace=True)
    assert db["fts5_table"].count == 10000
    assert db["fts5_table_fts_docsize"].count == 20000
    # Running rebuild-fts should fix this
    result = CliRunner().invoke(cli.cli, ["rebuild-fts", db_path, "fts5_table"])
    assert result.exit_code == 0
    assert db["fts5_table_fts_docsize"].count == 10000


@pytest.mark.parametrize(
    "format,expected",
    [
        ("--csv", "id,name,age\n1,Cleo,4\n2,Pancakes,2\n"),
        ("--tsv", "id\tname\tage\n1\tCleo\t4\n2\tPancakes\t2\n"),
    ],
)
def test_query_csv(db_path, format, expected):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert_all(
            [
                {"id": 1, "age": 4, "name": "Cleo"},
                {"id": 2, "age": 2, "name": "Pancakes"},
            ]
        )
    result = CliRunner().invoke(
        cli.cli, [db_path, "select id, name, age from dogs", format]
    )
    assert result.exit_code == 0
    assert result.output.replace("\r", "") == expected
    # Test the no-headers option:
    result = CliRunner().invoke(
        cli.cli, [db_path, "select id, name, age from dogs", "--no-headers", format]
    )
    expected_rest = "\n".join(expected.split("\n")[1:]).strip()
    assert result.output.strip().replace("\r", "") == expected_rest


_all_query = "select id, name, age from dogs"
_one_query = "select id, name, age from dogs where id = 1"


@pytest.mark.parametrize(
    "sql,args,expected",
    [
        (
            _all_query,
            [],
            '[{"id": 1, "name": "Cleo", "age": 4},\n {"id": 2, "name": "Pancakes", "age": 2}]',
        ),
        (
            _all_query,
            ["--nl"],
            '{"id": 1, "name": "Cleo", "age": 4}\n{"id": 2, "name": "Pancakes", "age": 2}',
        ),
        (_all_query, ["--arrays"], '[[1, "Cleo", 4],\n [2, "Pancakes", 2]]'),
        (_all_query, ["--arrays", "--nl"], '[1, "Cleo", 4]\n[2, "Pancakes", 2]'),
        (_one_query, [], '[{"id": 1, "name": "Cleo", "age": 4}]'),
        (_one_query, ["--nl"], '{"id": 1, "name": "Cleo", "age": 4}'),
        (_one_query, ["--arrays"], '[[1, "Cleo", 4]]'),
        (_one_query, ["--arrays", "--nl"], '[1, "Cleo", 4]'),
        (
            "select id, dog(age) from dogs",
            ["--functions", "def dog(i):\n  return i * 7"],
            '[{"id": 1, "dog(age)": 28},\n {"id": 2, "dog(age)": 14}]',
        ),
    ],
)
def test_query_json(db_path, sql, args, expected):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert_all(
            [
                {"id": 1, "age": 4, "name": "Cleo"},
                {"id": 2, "age": 2, "name": "Pancakes"},
            ]
        )
    result = CliRunner().invoke(cli.cli, [db_path, sql] + args)
    assert expected == result.output.strip()


def test_query_json_empty(db_path):
    result = CliRunner().invoke(
        cli.cli,
        [db_path, "select * from sqlite_master where 0"],
    )
    assert result.output.strip() == "[]"


def test_query_invalid_function(db_path):
    result = CliRunner().invoke(
        cli.cli, [db_path, "select bad()", "--functions", "def invalid_python"]
    )
    assert result.exit_code == 1
    assert result.output.startswith("Error: Error in functions definition:")


TEST_FUNCTIONS = """
def zero():
    return 0

def one(a):
    return a

def _two(a, b):
    return a + b

def two(a, b):
    return _two(a, b)
"""


def test_query_complex_function(db_path):
    result = CliRunner().invoke(
        cli.cli,
        [
            db_path,
            "select zero(), one(1), two(1, 2)",
            "--functions",
            TEST_FUNCTIONS,
        ],
    )
    assert result.exit_code == 0
    assert json.loads(result.output.strip()) == [
        {"zero()": 0, "one(1)": 1, "two(1, 2)": 3}
    ]


@pytest.mark.skipif(
    not _supports_pragma_function_list(),
    reason="Needs SQLite version that supports pragma_function_list()",
)
def test_hidden_functions_are_hidden(db_path):
    result = CliRunner().invoke(
        cli.cli,
        [
            db_path,
            "select name from pragma_function_list()",
            "--functions",
            TEST_FUNCTIONS,
        ],
    )
    assert result.exit_code == 0
    functions = {r["name"] for r in json.loads(result.output.strip())}
    assert "zero" in functions
    assert "one" in functions
    assert "two" in functions
    assert "_two" not in functions


def test_query_functions_from_file(db_path, tmp_path):
    # Create a temporary file with function definitions
    functions_file = tmp_path / "my_functions.py"
    functions_file.write_text(TEST_FUNCTIONS)

    result = CliRunner().invoke(
        cli.cli,
        [
            db_path,
            "select zero(), one(1), two(1, 2)",
            "--functions",
            str(functions_file),
        ],
    )
    assert result.exit_code == 0
    assert json.loads(result.output.strip()) == [
        {"zero()": 0, "one(1)": 1, "two(1, 2)": 3}
    ]


def test_query_functions_file_not_found(db_path):
    result = CliRunner().invoke(
        cli.cli,
        [
            db_path,
            "select zero()",
            "--functions",
            "nonexistent.py",
        ],
    )
    assert result.exit_code == 1
    assert "File not found: nonexistent.py" in result.output


def test_query_functions_multiple_invocations(db_path):
    # Test using --functions multiple times
    result = CliRunner().invoke(
        cli.cli,
        [
            db_path,
            "select triple(2), quadruple(2)",
            "--functions",
            "def triple(x):\n    return x * 3",
            "--functions",
            "def quadruple(x):\n    return x * 4",
        ],
    )
    assert result.exit_code == 0
    assert json.loads(result.output.strip()) == [{"triple(2)": 6, "quadruple(2)": 8}]


def test_query_functions_file_and_inline(db_path, tmp_path):
    # Test combining file and inline code
    functions_file = tmp_path / "file_funcs.py"
    functions_file.write_text("def triple(x):\n    return x * 3")

    result = CliRunner().invoke(
        cli.cli,
        [
            db_path,
            "select triple(2), quadruple(2)",
            "--functions",
            str(functions_file),
            "--functions",
            "def quadruple(x):\n    return x * 4",
        ],
    )
    assert result.exit_code == 0
    assert json.loads(result.output.strip()) == [{"triple(2)": 6, "quadruple(2)": 8}]


LOREM_IPSUM_COMPRESSED = (
    b"x\x9c\xed\xd1\xcdq\x03!\x0c\x05\xe0\xbb\xabP\x01\x1eW\x91\xdc|M\x01\n\xc8\x8e"
    b"f\xf83H\x1e\x97\x1f\x91M\x8e\xe9\xe0\xdd\x96\x05\x84\xf4\xbek\x9fRI\xc7\xf2J"
    b"\xb9\x97>i\xa9\x11W\xb13\xa5\xde\x96$\x13\xf3I\x9cu\xe8J\xda\xee$EcsI\x8e\x0b"
    b"$\xea\xab\xf6L&u\xc4emI\xb3foFnT\xf83\xca\x93\xd8QZ\xa8\xf2\xbd1q\xd1\x87\xf3"
    b"\x85>\x8c\xa4i\x8d\xdaTu\x7f<c\xc9\xf5L\x0f\xd7E\xad/\x9b\x9eI^2\x93\x1a\x9b"
    b"\xf6F^\n\xd7\xd4\x8f\xca\xfb\x90.\xdd/\xfd\x94\xd4\x11\x87I8\x1a\xaf\xd1S?\x06"
    b"\x88\xa7\xecBo\xbb$\xbb\t\xe9\xf4\xe8\xe4\x98U\x1bM\x19S\xbe\xa4e\x991x\xfc"
    b"x\xf6\xe2#\x9e\x93h'&%YK(i)\x7f\t\xc5@N7\xbf+\x1b\xb5\xdd\x10\r\x9e\xb1\xf0"
    b"y\xa1\xf7W\x92a\xe2;\xc6\xc8\xa0\xa7\xc4\x92\xe2\\\xf2\xa1\x99m\xdf\x88)\xc6"
    b"\xec\x9a\xa5\xed\x14wR\xf1h\xf22x\xcfM\xfdv\xd3\xa4LY\x96\xcc\xbd[{\xd9m\xf0"
    b"\x0eH#\x8e\xf5\x9b\xab\xd7\xcb\xe9t\x05\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03"
    b"\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03"
    b"\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03"
    b"\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03"
    b"\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\xfb\x8f\xef\x1b\x9b\x06\x83}"
)


def test_query_json_binary(db_path):
    db = Database(db_path)
    with db.conn:
        db["files"].insert(
            {
                "name": "lorem.txt",
                "sz": 16984,
                "data": LOREM_IPSUM_COMPRESSED,
            },
            pk="name",
        )
    result = CliRunner().invoke(cli.cli, [db_path, "select name, sz, data from files"])
    assert result.exit_code == 0, str(result)
    assert json.loads(result.output.strip()) == [
        {
            "name": "lorem.txt",
            "sz": 16984,
            "data": {
                "$base64": True,
                "encoded": (
                    (
                        "eJzt0c1xAyEMBeC7q1ABHleR3HxNAQrIjmb4M0gelx+RTY7p4N2WBYT0vmufUknH"
                        "8kq5lz5pqRFXsTOl3pYkE/NJnHXoStruJEVjc0mOCyTqq/ZMJnXEZW1Js2ZvRm5U+"
                        "DPKk9hRWqjyvTFx0YfzhT6MpGmN2lR1fzxjyfVMD9dFrS+bnkleMpMam/ZGXgrX1I"
                        "/K+5Au3S/9lNQRh0k4Gq/RUz8GiKfsQm+7JLsJ6fTo5JhVG00ZU76kZZkxePx49uI"
                        "jnpNoJyYlWUsoaSl/CcVATje/Kxu13RANnrHweaH3V5Jh4jvGyKCnxJLiXPKhmW3f"
                        "iCnG7Jql7RR3UvFo8jJ4z039dtOkTFmWzL1be9lt8A5II471m6vXy+l0BR/4wAc+8"
                        "IEPfOADH/jABz7wgQ984AMf+MAHPvCBD3zgAx/4wAc+8IEPfOADH/jABz7wgQ984A"
                        "Mf+MAHPvCBD3zgAx/4wAc+8IEPfOADH/jABz7wgQ984PuP7xubBoN9"
                    )
                ),
            },
        }
    ]


@pytest.mark.parametrize(
    "sql,params,expected",
    [
        ("select 1 + 1 as out", {"p": "2"}, 2),
        ("select 1 + :p as out", {"p": "2"}, 3),
        (
            "select :hello as out",
            {"hello": """This"has'many'quote"s"""},
            """This"has'many'quote"s""",
        ),
    ],
)
def test_query_params(db_path, sql, params, expected):
    extra_args = []
    for key, value in params.items():
        extra_args.extend(["-p", key, value])
    result = CliRunner().invoke(cli.cli, [db_path, sql] + extra_args)
    assert result.exit_code == 0, str(result)
    assert json.loads(result.output.strip()) == [{"out": expected}]


def test_query_json_with_json_cols(db_path):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert(
            {
                "id": 1,
                "name": "Cleo",
                "friends": [{"name": "Pancakes"}, {"name": "Bailey"}],
            }
        )
    result = CliRunner().invoke(
        cli.cli, [db_path, "select id, name, friends from dogs"]
    )
    assert (
        r"""
    [{"id": 1, "name": "Cleo", "friends": "[{\"name\": \"Pancakes\"}, {\"name\": \"Bailey\"}]"}]
    """.strip()
        == result.output.strip()
    )
    # With --json-cols:
    result = CliRunner().invoke(
        cli.cli, [db_path, "select id, name, friends from dogs", "--json-cols"]
    )
    expected = r"""
    [{"id": 1, "name": "Cleo", "friends": [{"name": "Pancakes"}, {"name": "Bailey"}]}]
    """.strip()
    assert expected == result.output.strip()
    # Test rows command too
    result_rows = CliRunner().invoke(cli.cli, ["rows", db_path, "dogs", "--json-cols"])
    assert expected == result_rows.output.strip()


@pytest.mark.parametrize(
    "content,is_binary",
    [(b"\x00\x0fbinary", True), ("this is text", False), (1, False), (1.5, False)],
)
def test_query_raw(db_path, content, is_binary):
    Database(db_path)["files"].insert({"content": content})
    result = CliRunner().invoke(
        cli.cli, [db_path, "select content from files", "--raw"]
    )
    if is_binary:
        assert result.stdout_bytes == content
    else:
        assert result.output == str(content)


@pytest.mark.parametrize(
    "content,is_binary",
    [(b"\x00\x0fbinary", True), ("this is text", False), (1, False), (1.5, False)],
)
def test_query_raw_lines(db_path, content, is_binary):
    Database(db_path)["files"].insert_all({"content": content} for _ in range(3))
    result = CliRunner().invoke(
        cli.cli, [db_path, "select content from files", "--raw-lines"]
    )
    if is_binary:
        assert result.stdout_bytes == b"\n".join(content for _ in range(3)) + b"\n"
    else:
        assert result.output == "\n".join(str(content) for _ in range(3)) + "\n"


def test_query_memory_does_not_create_file(tmpdir):
    owd = os.getcwd()
    try:
        os.chdir(tmpdir)
        # This should create a foo.db file
        CliRunner().invoke(cli.cli, ["foo.db", "select sqlite_version()"])
        # This should NOT create a file
        result = CliRunner().invoke(cli.cli, [":memory:", "select sqlite_version()"])
        assert ["sqlite_version()"] == list(json.loads(result.output)[0].keys())
    finally:
        os.chdir(owd)
    assert ["foo.db"] == os.listdir(tmpdir)


@pytest.mark.parametrize(
    "args,expected",
    [
        (
            [],
            '[{"id": 1, "name": "Cleo", "age": 4},\n {"id": 2, "name": "Pancakes", "age": 2}]',
        ),
        (
            ["--nl"],
            '{"id": 1, "name": "Cleo", "age": 4}\n{"id": 2, "name": "Pancakes", "age": 2}',
        ),
        (["--arrays"], '[[1, "Cleo", 4],\n [2, "Pancakes", 2]]'),
        (["--arrays", "--nl"], '[1, "Cleo", 4]\n[2, "Pancakes", 2]'),
        (
            ["--nl", "-c", "age", "-c", "name"],
            '{"age": 4, "name": "Cleo"}\n{"age": 2, "name": "Pancakes"}',
        ),
        # --limit and --offset
        (
            ["-c", "name", "--limit", "1"],
            '[{"name": "Cleo"}]',
        ),
        (
            ["-c", "name", "--limit", "1", "--offset", "1"],
            '[{"name": "Pancakes"}]',
        ),
        # --where
        (
            ["-c", "name", "--where", "id = 1"],
            '[{"name": "Cleo"}]',
        ),
        (
            ["-c", "name", "--where", "id = :id", "-p", "id", "1"],
            '[{"name": "Cleo"}]',
        ),
        (
            ["-c", "name", "--where", "id = :id", "--param", "id", "1"],
            '[{"name": "Cleo"}]',
        ),
        # --order
        (
            ["-c", "id", "--order", "id desc", "--limit", "1"],
            '[{"id": 2}]',
        ),
        (
            ["-c", "id", "--order", "id", "--limit", "1"],
            '[{"id": 1}]',
        ),
    ],
)
def test_rows(db_path, args, expected):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert_all(
            [
                {"id": 1, "age": 4, "name": "Cleo"},
                {"id": 2, "age": 2, "name": "Pancakes"},
            ],
            column_order=("id", "name", "age"),
        )
    result = CliRunner().invoke(cli.cli, ["rows", db_path, "dogs"] + args)
    assert expected == result.output.strip()


def test_upsert(db_path, tmpdir):
    json_path = str(tmpdir / "dogs.json")
    db = Database(db_path)
    insert_dogs = [
        {"id": 1, "name": "Cleo", "age": 4},
        {"id": 2, "name": "Nixie", "age": 4},
    ]
    write_json(json_path, insert_dogs)
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "dogs", json_path, "--pk", "id"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0, result.output
    assert 2 == db["dogs"].count
    # Now run the upsert to update just their ages
    upsert_dogs = [
        {"id": 1, "age": 5},
        {"id": 2, "age": 5},
    ]
    write_json(json_path, upsert_dogs)
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "dogs", json_path, "--pk", "id"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0, result.output
    assert list(db.query("select * from dogs order by id")) == [
        {"id": 1, "name": "Cleo", "age": 5},
        {"id": 2, "name": "Nixie", "age": 5},
    ]


def test_upsert_pk_required(db_path, tmpdir):
    json_path = str(tmpdir / "dogs.json")
    insert_dogs = [
        {"id": 1, "name": "Cleo", "age": 4},
        {"id": 2, "name": "Nixie", "age": 4},
    ]
    write_json(json_path, insert_dogs)
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "dogs", json_path],
        catch_exceptions=False,
    )
    assert result.exit_code == 2
    assert "Error: Missing option '--pk'" in result.output


def test_upsert_analyze(db_path, tmpdir):
    db = Database(db_path)
    db["rows"].insert({"id": 1, "foo": "x", "n": 3}, pk="id")
    db["rows"].create_index(["n"])
    assert "sqlite_stat1" not in db.table_names()
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "rows", "-", "--nl", "--analyze", "--pk", "id"],
        input='{"id": 2, "foo": "bar", "n": 1}',
    )
    assert result.exit_code == 0, result.output
    assert "sqlite_stat1" in db.table_names()


def test_upsert_flatten(tmpdir):
    db_path = str(tmpdir / "flat.db")
    db = Database(db_path)
    db["upsert_me"].insert({"id": 1, "name": "Example"}, pk="id")
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "upsert_me", "-", "--flatten", "--pk", "id", "--alter"],
        input=json.dumps({"id": 1, "nested": {"two": 2}}),
    )
    assert result.exit_code == 0
    assert list(db.query("select * from upsert_me")) == [
        {"id": 1, "name": "Example", "nested_two": 2}
    ]


def test_upsert_alter(db_path, tmpdir):
    json_path = str(tmpdir / "dogs.json")
    db = Database(db_path)
    insert_dogs = [{"id": 1, "name": "Cleo"}]
    write_json(json_path, insert_dogs)
    result = CliRunner().invoke(
        cli.cli, ["insert", db_path, "dogs", json_path, "--pk", "id"]
    )
    assert result.exit_code == 0, result.output
    # Should fail with error code if no --alter
    upsert_dogs = [{"id": 1, "age": 5}]
    write_json(json_path, upsert_dogs)
    result = CliRunner().invoke(
        cli.cli, ["upsert", db_path, "dogs", json_path, "--pk", "id"]
    )
    assert result.exit_code == 1
    # Could be one of two errors depending on SQLite version
    assert ("Try using --alter to add additional columns") in result.output.strip()
    # Should succeed with --alter
    result = CliRunner().invoke(
        cli.cli, ["upsert", db_path, "dogs", json_path, "--pk", "id", "--alter"]
    )
    assert result.exit_code == 0
    assert list(db.query("select * from dogs order by id")) == [
        {"id": 1, "name": "Cleo", "age": 5},
    ]


@pytest.mark.parametrize(
    "args,schema",
    [
        # No primary key
        (
            [
                "name",
                "text",
                "age",
                "integer",
            ],
            ('CREATE TABLE "t" (\n   "name" TEXT,\n   "age" INTEGER\n)'),
        ),
        # All types:
        (
            [
                "id",
                "integer",
                "name",
                "text",
                "age",
                "integer",
                "weight",
                "float",
                "thumbnail",
                "blob",
                "--pk",
                "id",
            ],
            (
                'CREATE TABLE "t" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "name" TEXT,\n'
                '   "age" INTEGER,\n'
                '   "weight" FLOAT,\n'
                '   "thumbnail" BLOB\n'
                ")"
            ),
        ),
        # Not null:
        (
            ["name", "text", "--not-null", "name"],
            ('CREATE TABLE "t" (\n' '   "name" TEXT NOT NULL\n' ")"),
        ),
        # Default:
        (
            ["age", "integer", "--default", "age", "3"],
            ('CREATE TABLE "t" (\n' "   \"age\" INTEGER DEFAULT '3'\n" ")"),
        ),
        # Compound primary key
        (
            ["category", "text", "name", "text", "--pk", "category", "--pk", "name"],
            (
                'CREATE TABLE "t" (\n   "category" TEXT,\n   "name" TEXT,\n'
                '   PRIMARY KEY ("category", "name")\n)'
            ),
        ),
    ],
)
def test_create_table(args, schema):
    runner = CliRunner()
    with runner.isolated_filesystem():
        result = runner.invoke(
            cli.cli,
            [
                "create-table",
                "test.db",
                "t",
            ]
            + args,
            catch_exceptions=False,
        )
        assert result.exit_code == 0
        db = Database("test.db")
        assert schema == db["t"].schema


def test_create_table_foreign_key():
    runner = CliRunner()
    creates = (
        ["authors", "id", "integer", "name", "text", "--pk", "id"],
        [
            "books",
            "id",
            "integer",
            "title",
            "text",
            "author_id",
            "integer",
            "--pk",
            "id",
            "--fk",
            "author_id",
            "authors",
            "id",
        ],
    )
    with runner.isolated_filesystem():
        for args in creates:
            result = runner.invoke(
                cli.cli, ["create-table", "books.db"] + args, catch_exceptions=False
            )
            assert result.exit_code == 0
        db = Database("books.db")
        assert (
            'CREATE TABLE "authors" (\n'
            '   "id" INTEGER PRIMARY KEY,\n'
            '   "name" TEXT\n'
            ")"
        ) == db["authors"].schema
        assert (
            'CREATE TABLE "books" (\n'
            '   "id" INTEGER PRIMARY KEY,\n'
            '   "title" TEXT,\n'
            '   "author_id" INTEGER REFERENCES "authors"("id")\n'
            ")"
        ) == db["books"].schema


def test_create_table_error_if_table_exists():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["dogs"].insert({"name": "Cleo"})
        result = runner.invoke(
            cli.cli, ["create-table", "test.db", "dogs", "id", "integer"]
        )
        assert result.exit_code == 1
        assert (
            'Error: Table "dogs" already exists. Use --replace to delete and replace it.'
            == result.output.strip()
        )


def test_create_table_ignore():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["dogs"].insert({"name": "Cleo"})
        result = runner.invoke(
            cli.cli, ["create-table", "test.db", "dogs", "id", "integer", "--ignore"]
        )
        assert result.exit_code == 0
        assert 'CREATE TABLE "dogs" (\n   "name" TEXT\n)' == db["dogs"].schema


def test_create_table_replace():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["dogs"].insert({"name": "Cleo"})
        result = runner.invoke(
            cli.cli, ["create-table", "test.db", "dogs", "id", "integer", "--replace"]
        )
        assert result.exit_code == 0
        assert 'CREATE TABLE "dogs" (\n   "id" INTEGER\n)' == db["dogs"].schema


def test_create_view():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        result = runner.invoke(
            cli.cli, ["create-view", "test.db", "version", "select sqlite_version()"]
        )
        assert result.exit_code == 0
        assert (
            'CREATE VIEW "version" AS select sqlite_version()' == db["version"].schema
        )


def test_create_view_error_if_view_exists():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db.create_view("version", "select sqlite_version() + 1")
        result = runner.invoke(
            cli.cli, ["create-view", "test.db", "version", "select sqlite_version()"]
        )
        assert result.exit_code == 1
        assert (
            'Error: View "version" already exists. Use --replace to delete and replace it.'
            == result.output.strip()
        )


def test_create_view_ignore():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db.create_view("version", "select sqlite_version() + 1")
        result = runner.invoke(
            cli.cli,
            [
                "create-view",
                "test.db",
                "version",
                "select sqlite_version()",
                "--ignore",
            ],
        )
        assert result.exit_code == 0
        assert (
            'CREATE VIEW "version" AS select sqlite_version() + 1'
            == db["version"].schema
        )


def test_create_view_replace():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db.create_view("version", "select sqlite_version() + 1")
        result = runner.invoke(
            cli.cli,
            [
                "create-view",
                "test.db",
                "version",
                "select sqlite_version()",
                "--replace",
            ],
        )
        assert result.exit_code == 0
        assert (
            'CREATE VIEW "version" AS select sqlite_version()' == db["version"].schema
        )


def test_drop_table():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["t"].create({"pk": int}, pk="pk")
        assert "t" in db.table_names()
        result = runner.invoke(
            cli.cli,
            [
                "drop-table",
                "test.db",
                "t",
            ],
        )
        assert result.exit_code == 0
        assert "t" not in db.table_names()


def test_drop_table_error():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["t"].create({"pk": int}, pk="pk")
        result = runner.invoke(
            cli.cli,
            [
                "drop-table",
                "test.db",
                "t2",
            ],
        )
        assert result.exit_code == 1
        assert 'Error: Table "t2" does not exist' == result.output.strip()
        # Using --ignore suppresses that error
        result = runner.invoke(
            cli.cli,
            ["drop-table", "test.db", "t2", "--ignore"],
        )
        assert result.exit_code == 0


def test_drop_view():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db.create_view("hello", "select 1")
        assert "hello" in db.view_names()
        result = runner.invoke(
            cli.cli,
            [
                "drop-view",
                "test.db",
                "hello",
            ],
        )
        assert result.exit_code == 0
        assert "hello" not in db.view_names()


def test_drop_view_error():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["t"].create({"pk": int}, pk="pk")
        result = runner.invoke(
            cli.cli,
            [
                "drop-view",
                "test.db",
                "t2",
            ],
        )
        assert result.exit_code == 1
        assert 'Error: View "t2" does not exist' == result.output.strip()
        # Using --ignore suppresses that error
        result = runner.invoke(
            cli.cli,
            ["drop-view", "test.db", "t2", "--ignore"],
        )
        assert result.exit_code == 0


def test_enable_wal():
    runner = CliRunner()
    dbs = ["test.db", "test2.db"]
    with runner.isolated_filesystem():
        for dbname in dbs:
            db = Database(dbname)
            db["t"].create({"pk": int}, pk="pk")
            assert db.journal_mode == "delete"
        result = runner.invoke(cli.cli, ["enable-wal"] + dbs, catch_exceptions=False)
        assert result.exit_code == 0
        for dbname in dbs:
            db = Database(dbname)
            assert db.journal_mode == "wal"


def test_disable_wal():
    runner = CliRunner()
    dbs = ["test.db", "test2.db"]
    with runner.isolated_filesystem():
        for dbname in dbs:
            db = Database(dbname)
            db["t"].create({"pk": int}, pk="pk")
            db.enable_wal()
            assert db.journal_mode == "wal"
        result = runner.invoke(cli.cli, ["disable-wal"] + dbs)
        assert result.exit_code == 0
        for dbname in dbs:
            db = Database(dbname)
            assert db.journal_mode == "delete"


@pytest.mark.parametrize(
    "args,expected",
    [
        (
            [],
            '[{"rows_affected": 1}]',
        ),
        (["-t"], "rows_affected\n---------------\n              1"),
    ],
)
def test_query_update(db_path, args, expected):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert_all(
            [
                {"id": 1, "age": 4, "name": "Cleo"},
            ]
        )
    result = CliRunner().invoke(
        cli.cli, [db_path, "update dogs set age = 5 where name = 'Cleo'"] + args
    )
    assert expected == result.output.strip()
    assert list(db.query("select * from dogs")) == [
        {"id": 1, "age": 5, "name": "Cleo"},
    ]


def test_add_foreign_keys(db_path):
    db = Database(db_path)
    db["countries"].insert({"id": 7, "name": "Panama"}, pk="id")
    db["authors"].insert({"id": 3, "name": "Matilda", "country_id": 7}, pk="id")
    db["books"].insert({"id": 2, "title": "Wolf anatomy", "author_id": 3}, pk="id")
    assert db["authors"].foreign_keys == []
    assert db["books"].foreign_keys == []
    result = CliRunner().invoke(
        cli.cli,
        [
            "add-foreign-keys",
            db_path,
            "authors",
            "country_id",
            "countries",
            "id",
            "books",
            "author_id",
            "authors",
            "id",
        ],
    )
    assert result.exit_code == 0
    assert db["authors"].foreign_keys == [
        ForeignKey(
            table="authors",
            column="country_id",
            other_table="countries",
            other_column="id",
        )
    ]
    assert db["books"].foreign_keys == [
        ForeignKey(
            table="books", column="author_id", other_table="authors", other_column="id"
        )
    ]


@pytest.mark.parametrize(
    "args,expected_schema",
    [
        (
            [],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                "   \"age\" INTEGER NOT NULL DEFAULT '1',\n"
                '   "name" TEXT\n'
                ")"
            ),
        ),
        (
            ["--type", "age", "text"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                "   \"age\" TEXT NOT NULL DEFAULT '1',\n"
                '   "name" TEXT\n'
                ")"
            ),
        ),
        (
            ["--drop", "age"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "name" TEXT\n'
                ")"
            ),
        ),
        (
            ["--rename", "age", "age2", "--rename", "id", "pk"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "pk" INTEGER PRIMARY KEY,\n'
                "   \"age2\" INTEGER NOT NULL DEFAULT '1',\n"
                '   "name" TEXT\n'
                ")"
            ),
        ),
        (
            ["--not-null", "name"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                "   \"age\" INTEGER NOT NULL DEFAULT '1',\n"
                '   "name" TEXT NOT NULL\n'
                ")"
            ),
        ),
        (
            ["--not-null-false", "age"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                "   \"age\" INTEGER DEFAULT '1',\n"
                '   "name" TEXT\n'
                ")"
            ),
        ),
        (
            ["--pk", "name"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER,\n'
                "   \"age\" INTEGER NOT NULL DEFAULT '1',\n"
                '   "name" TEXT PRIMARY KEY\n'
                ")"
            ),
        ),
        (
            ["--pk-none"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER,\n'
                "   \"age\" INTEGER NOT NULL DEFAULT '1',\n"
                '   "name" TEXT\n'
                ")"
            ),
        ),
        (
            ["--default", "name", "Turnip"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                "   \"age\" INTEGER NOT NULL DEFAULT '1',\n"
                "   \"name\" TEXT DEFAULT 'Turnip'\n"
                ")"
            ),
        ),
        (
            ["--default-none", "age"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "age" INTEGER NOT NULL,\n'
                '   "name" TEXT\n'
                ")"
            ),
        ),
        (
            ["-o", "name", "--column-order", "age", "-o", "id"],
            (
                'CREATE TABLE "dogs" (\n'
                '   "name" TEXT,\n'
                "   \"age\" INTEGER NOT NULL DEFAULT '1',\n"
                '   "id" INTEGER PRIMARY KEY\n'
                ")"
            ),
        ),
    ],
)
def test_transform(db_path, args, expected_schema):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert(
            {"id": 1, "age": 4, "name": "Cleo"},
            not_null={"age"},
            defaults={"age": 1},
            pk="id",
        )
    result = CliRunner().invoke(cli.cli, ["transform", db_path, "dogs"] + args)
    print(result.output)
    assert result.exit_code == 0
    schema = db["dogs"].schema
    assert schema == expected_schema


@pytest.mark.parametrize(
    "extra_args,expected_schema",
    (
        (
            ["--drop-foreign-key", "country"],
            (
                'CREATE TABLE "places" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "name" TEXT,\n'
                '   "country" INTEGER,\n'
                '   "city" INTEGER REFERENCES "city"("id"),\n'
                '   "continent" INTEGER\n'
                ")"
            ),
        ),
        (
            ["--drop-foreign-key", "country", "--drop-foreign-key", "city"],
            (
                'CREATE TABLE "places" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "name" TEXT,\n'
                '   "country" INTEGER,\n'
                '   "city" INTEGER,\n'
                '   "continent" INTEGER\n'
                ")"
            ),
        ),
        (
            ["--add-foreign-key", "continent", "continent", "id"],
            (
                'CREATE TABLE "places" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "name" TEXT,\n'
                '   "country" INTEGER REFERENCES "country"("id"),\n'
                '   "city" INTEGER REFERENCES "city"("id"),\n'
                '   "continent" INTEGER REFERENCES "continent"("id")\n'
                ")"
            ),
        ),
    ),
)
def test_transform_add_or_drop_foreign_key(db_path, extra_args, expected_schema):
    db = Database(db_path)
    with db.conn:
        # Create table with three foreign keys so we can drop two of them
        db["continent"].insert({"id": 1, "name": "Europe"}, pk="id")
        db["country"].insert({"id": 1, "name": "France"}, pk="id")
        db["city"].insert({"id": 24, "name": "Paris"}, pk="id")
        db["places"].insert(
            {
                "id": 32,
                "name": "Caveau de la Huchette",
                "country": 1,
                "city": 24,
                "continent": 1,
            },
            foreign_keys=("country", "city"),
            pk="id",
        )
    result = CliRunner().invoke(
        cli.cli,
        [
            "transform",
            db_path,
            "places",
        ]
        + extra_args,
    )
    assert result.exit_code == 0
    schema = db["places"].schema
    assert schema == expected_schema


_common_other_schema = (
    'CREATE TABLE "species" (\n   "id" INTEGER PRIMARY KEY,\n   "species" TEXT\n)'
)


@pytest.mark.parametrize(
    "args,expected_table_schema,expected_other_schema",
    [
        (
            [],
            (
                'CREATE TABLE "trees" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "address" TEXT,\n'
                '   "species_id" INTEGER REFERENCES "species"("id")\n'
                ")"
            ),
            _common_other_schema,
        ),
        (
            ["--table", "custom_table"],
            (
                'CREATE TABLE "trees" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "address" TEXT,\n'
                '   "custom_table_id" INTEGER REFERENCES "custom_table"("id")\n'
                ")"
            ),
            'CREATE TABLE "custom_table" (\n   "id" INTEGER PRIMARY KEY,\n   "species" TEXT\n)',
        ),
        (
            ["--fk-column", "custom_fk"],
            (
                'CREATE TABLE "trees" (\n'
                '   "id" INTEGER PRIMARY KEY,\n'
                '   "address" TEXT,\n'
                '   "custom_fk" INTEGER REFERENCES "species"("id")\n'
                ")"
            ),
            _common_other_schema,
        ),
        (
            ["--rename", "name", "name2"],
            'CREATE TABLE "trees" (\n'
            '   "id" INTEGER PRIMARY KEY,\n'
            '   "address" TEXT,\n'
            '   "species_id" INTEGER REFERENCES "species"("id")\n'
            ")",
            'CREATE TABLE "species" (\n   "id" INTEGER PRIMARY KEY,\n   "species" TEXT\n)',
        ),
    ],
)
def test_extract(db_path, args, expected_table_schema, expected_other_schema):
    db = Database(db_path)
    with db.conn:
        db["trees"].insert(
            {"id": 1, "address": "4 Park Ave", "species": "Palm"},
            pk="id",
        )
    result = CliRunner().invoke(
        cli.cli, ["extract", db_path, "trees", "species"] + args
    )
    print(result.output)
    assert result.exit_code == 0
    schema = db["trees"].schema
    assert schema == expected_table_schema
    other_schema = [t for t in db.tables if t.name not in ("trees", "Gosh", "Gosh2")][
        0
    ].schema
    assert other_schema == expected_other_schema


def test_insert_encoding(tmpdir):
    db_path = str(tmpdir / "test.db")
    latin1_csv = (
        b"date,name,latitude,longitude\n"
        b"2020-01-01,Barra da Lagoa,-27.574,-48.422\n"
        b"2020-03-04,S\xe3o Paulo,-23.561,-46.645\n"
        b"2020-04-05,Salta,-24.793:-65.408"
    )
    assert latin1_csv.decode("latin-1").split("\n")[2].split(",")[1] == "São Paulo"
    csv_path = str(tmpdir / "test.csv")
    with open(csv_path, "wb") as fp:
        fp.write(latin1_csv)
    # First attempt should error:
    bad_result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "places", csv_path, "--csv"],
        catch_exceptions=False,
    )
    assert bad_result.exit_code == 1
    assert (
        "The input you provided uses a character encoding other than utf-8"
        in bad_result.output
    )
    # Using --encoding=latin-1 should work
    good_result = CliRunner().invoke(
        cli.cli,
        [
            "insert",
            db_path,
            "places",
            csv_path,
            "--encoding",
            "latin-1",
            "--csv",
            "--no-detect-types",
        ],
        catch_exceptions=False,
    )
    assert good_result.exit_code == 0
    db = Database(db_path)
    assert list(db["places"].rows) == [
        {
            "date": "2020-01-01",
            "name": "Barra da Lagoa",
            "latitude": "-27.574",
            "longitude": "-48.422",
        },
        {
            "date": "2020-03-04",
            "name": "São Paulo",
            "latitude": "-23.561",
            "longitude": "-46.645",
        },
        {
            "date": "2020-04-05",
            "name": "Salta",
            "latitude": "-24.793:-65.408",
            "longitude": None,
        },
    ]


@pytest.mark.parametrize("fts", ["FTS4", "FTS5"])
@pytest.mark.parametrize(
    "extra_arg,expected",
    [
        (
            None,
            '[{"rowid": 2, "id": 2, "title": "Title the second"}]\n',
        ),
        ("--csv", "rowid,id,title\n2,2,Title the second\n"),
    ],
)
def test_search(tmpdir, fts, extra_arg, expected):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["articles"].insert_all(
        [
            {"id": 1, "title": "Title the first"},
            {"id": 2, "title": "Title the second"},
            {"id": 3, "title": "Title the third"},
        ],
        pk="id",
    )
    db["articles"].enable_fts(["title"], fts_version=fts)
    result = CliRunner().invoke(
        cli.cli,
        ["search", db_path, "articles", "second"] + ([extra_arg] if extra_arg else []),
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert result.output.replace("\r", "") == expected


def test_search_quote(tmpdir):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["creatures"].insert({"name": "dog."}).enable_fts(["name"])
    # Without --quote should return an error
    error_result = CliRunner().invoke(cli.cli, ["search", db_path, "creatures", 'dog"'])
    assert error_result.exit_code == 1
    assert error_result.output == (
        "Error: unterminated string\n\n"
        "Try running this again with the --quote option\n"
    )
    # With --quote it should work
    result = CliRunner().invoke(
        cli.cli, ["search", db_path, "creatures", 'dog"', "--quote"]
    )
    assert result.exit_code == 0
    assert result.output.strip() == '[{"rowid": 1, "name": "dog."}]'


def test_indexes(tmpdir):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db.conn.executescript(
        """
        create table Gosh (c1 text, c2 text, c3 text);
        create index Gosh_idx on Gosh(c2, c3 desc);
    """
    )
    result = CliRunner().invoke(
        cli.cli,
        ["indexes", str(db_path)],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert json.loads(result.output) == [
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 0,
            "cid": 1,
            "name": "c2",
            "desc": 0,
            "coll": "BINARY",
            "key": 1,
        },
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 1,
            "cid": 2,
            "name": "c3",
            "desc": 1,
            "coll": "BINARY",
            "key": 1,
        },
    ]
    result2 = CliRunner().invoke(
        cli.cli,
        ["indexes", str(db_path), "--aux"],
        catch_exceptions=False,
    )
    assert result2.exit_code == 0
    assert json.loads(result2.output) == [
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 0,
            "cid": 1,
            "name": "c2",
            "desc": 0,
            "coll": "BINARY",
            "key": 1,
        },
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 1,
            "cid": 2,
            "name": "c3",
            "desc": 1,
            "coll": "BINARY",
            "key": 1,
        },
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 2,
            "cid": -1,
            "name": None,
            "desc": 0,
            "coll": "BINARY",
            "key": 0,
        },
    ]


_TRIGGERS_EXPECTED = (
    '[{"name": "blah", "table": "articles", "sql": "CREATE TRIGGER blah '
    'AFTER INSERT ON articles\\nBEGIN\\n    UPDATE counter SET count = count + 1;\\nEND"}]\n'
)


@pytest.mark.parametrize(
    "extra_args,expected",
    [
        ([], _TRIGGERS_EXPECTED),
        (["articles"], _TRIGGERS_EXPECTED),
        (["counter"], "[]\n"),
    ],
)
def test_triggers(tmpdir, extra_args, expected):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["articles"].insert(
        {"id": 1, "title": "Title the first"},
        pk="id",
    )
    db["counter"].insert({"count": 1})
    db.conn.execute(
        textwrap.dedent(
            """
        CREATE TRIGGER blah AFTER INSERT ON articles
        BEGIN
            UPDATE counter SET count = count + 1;
        END
    """
        )
    )
    args = ["triggers", db_path]
    if extra_args:
        args.extend(extra_args)
    result = CliRunner().invoke(
        cli.cli,
        args,
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert result.output == expected


@pytest.mark.parametrize(
    "options,expected",
    (
        (
            [],
            (
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER,\n'
                '   "name" TEXT\n'
                ");\n"
                'CREATE TABLE "chickens" (\n'
                '   "id" INTEGER,\n'
                '   "name" TEXT,\n'
                '   "breed" TEXT\n'
                ");\n"
                'CREATE INDEX "idx_chickens_breed"\n'
                '    ON "chickens" ("breed");\n'
            ),
        ),
        (
            ["dogs"],
            ('CREATE TABLE "dogs" (\n' '   "id" INTEGER,\n' '   "name" TEXT\n' ")\n"),
        ),
        (
            ["chickens", "dogs"],
            (
                'CREATE TABLE "chickens" (\n'
                '   "id" INTEGER,\n'
                '   "name" TEXT,\n'
                '   "breed" TEXT\n'
                ")\n"
                'CREATE TABLE "dogs" (\n'
                '   "id" INTEGER,\n'
                '   "name" TEXT\n'
                ")\n"
            ),
        ),
    ),
)
def test_schema(tmpdir, options, expected):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["dogs"].create({"id": int, "name": str})
    db["chickens"].create({"id": int, "name": str, "breed": str})
    db["chickens"].create_index(["breed"])
    result = CliRunner().invoke(
        cli.cli,
        ["schema", db_path] + options,
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert result.output == expected


def test_long_csv_column_value(tmpdir):
    db_path = str(tmpdir / "test.db")
    csv_path = str(tmpdir / "test.csv")
    with open(csv_path, "w") as csv_file:
        long_string = "a" * 131073
        csv_file.write("id,text\n")
        csv_file.write("1,{}\n".format(long_string))
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "bigtable", csv_path, "--csv"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    db = Database(db_path)
    rows = list(db["bigtable"].rows)
    assert len(rows) == 1
    assert rows[0]["text"] == long_string


@pytest.mark.parametrize(
    "args,tsv",
    (
        (["--csv", "--no-headers"], False),
        (["--no-headers"], False),
        (["--tsv", "--no-headers"], True),
    ),
)
def test_import_no_headers(tmpdir, args, tsv):
    db_path = str(tmpdir / "test.db")
    csv_path = str(tmpdir / "test.csv")
    with open(csv_path, "w") as csv_file:
        sep = "\t" if tsv else ","
        csv_file.write("Cleo{sep}Dog{sep}5\n".format(sep=sep))
        csv_file.write("Tracy{sep}Spider{sep}7\n".format(sep=sep))
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "creatures", csv_path] + args + ["--no-detect-types"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0, result.output
    db = Database(db_path)
    schema = db["creatures"].schema
    assert schema == (
        'CREATE TABLE "creatures" (\n'
        '   "untitled_1" TEXT,\n'
        '   "untitled_2" TEXT,\n'
        '   "untitled_3" TEXT\n'
        ")"
    )
    rows = list(db["creatures"].rows)
    assert rows == [
        {"untitled_1": "Cleo", "untitled_2": "Dog", "untitled_3": "5"},
        {"untitled_1": "Tracy", "untitled_2": "Spider", "untitled_3": "7"},
    ]


def test_attach(tmpdir):
    foo_path = str(tmpdir / "foo.db")
    bar_path = str(tmpdir / "bar.db")
    db = Database(foo_path)
    with db.conn:
        db["foo"].insert({"id": 1, "text": "foo"})
    db2 = Database(bar_path)
    with db2.conn:
        db2["bar"].insert({"id": 1, "text": "bar"})
    db.attach("bar", bar_path)
    sql = "select * from foo union all select * from bar.bar"
    result = CliRunner().invoke(
        cli.cli,
        [foo_path, "--attach", "bar", bar_path, sql],
        catch_exceptions=False,
    )
    assert json.loads(result.output) == [
        {"id": 1, "text": "foo"},
        {"id": 1, "text": "bar"},
    ]


def test_csv_insert_bom(tmpdir):
    db_path = str(tmpdir / "test.db")
    bom_csv_path = str(tmpdir / "bom.csv")
    with open(bom_csv_path, "wb") as fp:
        fp.write(b"\xef\xbb\xbfname,age\nCleo,5")
    result = CliRunner().invoke(
        cli.cli,
        [
            "insert",
            db_path,
            "broken",
            bom_csv_path,
            "--encoding",
            "utf-8",
            "--csv",
            "--no-detect-types",
        ],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    result2 = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "fixed", bom_csv_path, "--csv", "--no-detect-types"],
        catch_exceptions=False,
    )
    assert result2.exit_code == 0
    db = Database(db_path)
    tables = db.execute("select name, sql from sqlite_master").fetchall()
    assert tables == [
        ("broken", 'CREATE TABLE "broken" (\n   "\ufeffname" TEXT,\n   "age" TEXT\n)'),
        ("fixed", 'CREATE TABLE "fixed" (\n   "name" TEXT,\n   "age" TEXT\n)'),
    ]


@pytest.mark.parametrize("option", (None, "-d", "--detect-types"))
def test_insert_detect_types(tmpdir, option):
    """Test that type detection is now the default behavior"""
    db_path = str(tmpdir / "test.db")
    data = "name,age,weight\nCleo,6,45.5\nDori,1,3.5"
    extra = []
    if option:
        extra = [option]

    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "creatures", "-", "--csv"] + extra,
        catch_exceptions=False,
        input=data,
    )
    assert result.exit_code == 0
    db = Database(db_path)
    assert list(db["creatures"].rows) == [
        {"name": "Cleo", "age": 6, "weight": 45.5},
        {"name": "Dori", "age": 1, "weight": 3.5},
    ]


@pytest.mark.parametrize("option", (None, "-d", "--detect-types"))
def test_upsert_detect_types(tmpdir, option):
    """Test that type detection is now the default behavior for upsert"""
    db_path = str(tmpdir / "test.db")
    data = "id,name,age,weight\n1,Cleo,6,45.5\n2,Dori,1,3.5"
    extra = []
    if option:
        extra = [option]
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "creatures", "-", "--csv", "--pk", "id"] + extra,
        catch_exceptions=False,
        input=data,
    )
    assert result.exit_code == 0
    db = Database(db_path)
    assert list(db["creatures"].rows) == [
        {"id": 1, "name": "Cleo", "age": 6, "weight": 45.5},
        {"id": 2, "name": "Dori", "age": 1, "weight": 3.5},
    ]


def test_csv_detect_types_creates_real_columns(tmpdir):
    """Test that CSV import creates REAL columns for floats (default behavior)"""
    db_path = str(tmpdir / "test.db")
    data = "name,age,weight\nCleo,6,45.5\nDori,1,3.5"
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "creatures", "-", "--csv"],
        catch_exceptions=False,
        input=data,
    )
    assert result.exit_code == 0
    db = Database(db_path)
    # Check that the schema uses REAL for the weight column
    assert db["creatures"].schema == (
        'CREATE TABLE "creatures" (\n'
        '   "name" TEXT,\n'
        '   "age" INTEGER,\n'
        '   "weight" REAL\n'
        ")"
    )


def test_insert_no_detect_types(tmpdir):
    """Test that --no-detect-types treats all columns as TEXT"""
    db_path = str(tmpdir / "test.db")
    data = "name,age,weight\nCleo,6,45.5\nDori,1,3.5"
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "creatures", "-", "--csv", "--no-detect-types"],
        catch_exceptions=False,
        input=data,
    )
    assert result.exit_code == 0
    db = Database(db_path)
    # All columns should be TEXT when --no-detect-types is used
    assert list(db["creatures"].rows) == [
        {"name": "Cleo", "age": "6", "weight": "45.5"},
        {"name": "Dori", "age": "1", "weight": "3.5"},
    ]
    assert db["creatures"].schema == (
        'CREATE TABLE "creatures" (\n'
        '   "name" TEXT,\n'
        '   "age" TEXT,\n'
        '   "weight" TEXT\n'
        ")"
    )


def test_upsert_no_detect_types(tmpdir):
    """Test that --no-detect-types treats all columns as TEXT for upsert"""
    db_path = str(tmpdir / "test.db")
    data = "id,name,age,weight\n1,Cleo,6,45.5\n2,Dori,1,3.5"
    result = CliRunner().invoke(
        cli.cli,
        [
            "upsert",
            db_path,
            "creatures",
            "-",
            "--csv",
            "--pk",
            "id",
            "--no-detect-types",
        ],
        catch_exceptions=False,
        input=data,
    )
    assert result.exit_code == 0
    db = Database(db_path)
    # All columns should be TEXT when --no-detect-types is used
    assert list(db["creatures"].rows) == [
        {"id": "1", "name": "Cleo", "age": "6", "weight": "45.5"},
        {"id": "2", "name": "Dori", "age": "1", "weight": "3.5"},
    ]
    assert db["creatures"].schema == (
        'CREATE TABLE "creatures" (\n'
        '   "id" TEXT PRIMARY KEY,\n'
        '   "name" TEXT,\n'
        '   "age" TEXT,\n'
        '   "weight" TEXT\n'
        ")"
    )


def test_integer_overflow_error(tmpdir):
    db_path = str(tmpdir / "test.db")
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "items", "-"],
        input=json.dumps({"bignumber": 34223049823094832094802398430298048240}),
    )
    assert result.exit_code == 1
    assert result.output == (
        "Error: Python int too large to convert to SQLite INTEGER\n\n"
        'sql = INSERT INTO "items" ("bignumber") VALUES (?)\n'
        "parameters = [34223049823094832094802398430298048240]\n"
    )


def test_python_dash_m():
    "Tool can be run using python -m sqlite_utils"
    result = subprocess.run(
        [sys.executable, "-m", "sqlite_utils", "--help"], stdout=subprocess.PIPE
    )
    assert result.returncode == 0
    assert b"Commands for interacting with a SQLite database" in result.stdout


@pytest.mark.parametrize("enable_wal", (False, True))
def test_create_database(tmpdir, enable_wal):
    db_path = tmpdir / "test.db"
    assert not db_path.exists()
    args = ["create-database", str(db_path)]
    if enable_wal:
        args.append("--enable-wal")
    result = CliRunner().invoke(cli.cli, args)
    assert result.exit_code == 0, result.output
    assert db_path.exists()
    assert db_path.read_binary()[:16] == b"SQLite format 3\x00"
    db = Database(str(db_path))
    if enable_wal:
        assert db.journal_mode == "wal"
    else:
        assert db.journal_mode == "delete"


@pytest.mark.parametrize(
    "options,expected",
    (
        (
            [],
            [
                {"tbl": "two_indexes", "idx": "idx_two_indexes_species", "stat": "1 1"},
                {"tbl": "two_indexes", "idx": "idx_two_indexes_name", "stat": "1 1"},
                {"tbl": "one_index", "idx": "idx_one_index_name", "stat": "1 1"},
            ],
        ),
        (
            ["one_index"],
            [
                {"tbl": "one_index", "idx": "idx_one_index_name", "stat": "1 1"},
            ],
        ),
        (
            ["idx_two_indexes_name"],
            [
                {"tbl": "two_indexes", "idx": "idx_two_indexes_name", "stat": "1 1"},
            ],
        ),
    ),
)
def test_analyze(tmpdir, options, expected):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["one_index"].insert({"id": 1, "name": "Cleo"}, pk="id")
    db["one_index"].create_index(["name"])
    db["two_indexes"].insert({"id": 1, "name": "Cleo", "species": "dog"}, pk="id")
    db["two_indexes"].create_index(["name"])
    db["two_indexes"].create_index(["species"])
    result = CliRunner().invoke(cli.cli, ["analyze", db_path] + options)
    assert result.exit_code == 0
    assert list(db["sqlite_stat1"].rows) == expected


def test_rename_table(tmpdir):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["one"].insert({"id": 1, "name": "Cleo"}, pk="id")
    # First try a non-existent table
    result_error = CliRunner().invoke(
        cli.cli,
        ["rename-table", db_path, "missing", "two"],
        catch_exceptions=False,
    )
    assert result_error.exit_code == 1
    assert result_error.output == (
        'Error: Table "missing" could not be renamed. ' "no such table: missing\n"
    )
    # And check --ignore works
    result_error2 = CliRunner().invoke(
        cli.cli,
        ["rename-table", db_path, "missing", "two", "--ignore"],
        catch_exceptions=False,
    )
    assert result_error2.exit_code == 0
    previous_columns = db["one"].columns_dict
    # Now try for a table that exists
    result = CliRunner().invoke(
        cli.cli,
        ["rename-table", db_path, "one", "two"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert db["two"].columns_dict == previous_columns


def test_duplicate_table(tmpdir):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["one"].insert({"id": 1, "name": "Cleo"}, pk="id")
    # First try a non-existent table
    result_error = CliRunner().invoke(
        cli.cli,
        ["duplicate", db_path, "missing", "two"],
        catch_exceptions=False,
    )
    assert result_error.exit_code == 1
    assert result_error.output == 'Error: Table "missing" does not exist\n'
    # And check --ignore works
    result_error2 = CliRunner().invoke(
        cli.cli,
        ["duplicate", db_path, "missing", "two", "--ignore"],
        catch_exceptions=False,
    )
    assert result_error2.exit_code == 0
    # Now try for a table that exists
    result = CliRunner().invoke(
        cli.cli,
        ["duplicate", db_path, "one", "two"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert db["one"].columns_dict == db["two"].columns_dict
    assert list(db["one"].rows) == list(db["two"].rows)


@pytest.mark.skipif(not _has_compiled_ext(), reason="Requires compiled ext.c")
@pytest.mark.parametrize(
    "entrypoint,should_pass,should_fail",
    (
        (None, ("a",), ("b", "c")),
        ("sqlite3_ext_b_init", ("b"), ("a", "c")),
        ("sqlite3_ext_c_init", ("c"), ("a", "b")),
    ),
)
def test_load_extension(entrypoint, should_pass, should_fail):
    ext = COMPILED_EXTENSION_PATH
    if entrypoint:
        ext += ":" + entrypoint
    for func in should_pass:
        result = CliRunner().invoke(
            cli.cli,
            ["memory", "select {}()".format(func), "--load-extension", ext],
            catch_exceptions=False,
        )
        assert result.exit_code == 0
    for func in should_fail:
        result = CliRunner().invoke(
            cli.cli,
            ["memory", "select {}()".format(func), "--load-extension", ext],
            catch_exceptions=False,
        )
        assert result.exit_code == 1


@pytest.mark.parametrize("strict", (False, True))
def test_create_table_strict(strict):
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        result = runner.invoke(
            cli.cli,
            ["create-table", "test.db", "items", "id", "integer", "w", "float"]
            + (["--strict"] if strict else []),
        )
        assert result.exit_code == 0
        assert db["items"].strict == strict or not db.supports_strict
        # Should have a floating point column
        assert db["items"].columns_dict == {"id": int, "w": float}


@pytest.mark.parametrize("method", ("insert", "upsert"))
@pytest.mark.parametrize("strict", (False, True))
def test_insert_upsert_strict(tmpdir, method, strict):
    db_path = str(tmpdir / "test.db")
    result = CliRunner().invoke(
        cli.cli,
        [method, db_path, "items", "-", "--csv", "--pk", "id"]
        + (["--strict"] if strict else []),
        input="id\n1",
    )
    assert result.exit_code == 0
    db = Database(db_path)
    assert db["items"].strict == strict or not db.supports_strict
